Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][txn][PIP-298] Consumer supports specifying consumption isolation level #21246

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

hzh0425
Copy link
Member

@hzh0425 hzh0425 commented Sep 25, 2023

PIP: #21114

Motivation

Consumer supports specifying consumption isolation level #21114

Modifications

Add a configuration 'SubscriptionIsolationLevel' in consumer builder:

PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("persistent://my-tenant/my-namespace/my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Shared)
        .subscriptionIsolationLevel(IsolationLevel.READ_COMMITTED) // Adding the isolation level configuration
        .subscribe();

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 25, 2023
@hzh0425
Copy link
Member Author

hzh0425 commented Sep 25, 2023

@liangyepianzhou pls cc

@@ -399,6 +399,14 @@ message CommandSubscribe {

// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
optional uint64 consumer_epoch = 19;

enum IsolationLevel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui Should we increase ProtocolVersion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The broker should only read the isolation type from a client version that has isolation-level support. Otherwise, the broker will get an incompatibility issue.

We should also add an integration test to https://github.com/apache/pulsar/tree/master/tests/integration
And we should also add a compatibility test like https://github.com/apache/pulsar/tree/master/tests/bc_2_6_0

…bscription isolation level is ReadUnCommitted.
@hzh0425
Copy link
Member Author

hzh0425 commented Oct 26, 2023

@codelipenghui

Hi, could you please help with the review as soon as possible? We are currently very urgent
Thank you for your time

*
* @param subscriptionIsolationLevel If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed,
* else if READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted.
* Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be configured with the same IsolationLevel.
Copy link
Contributor

@poorbarcode poorbarcode Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines are too long, so the check of check style could not pass. Could you run mvn clean install -Dskiptests first to ensure the check can passed? Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui
Hi, could you please help with the review as soon as possible? We are currently very urgent.
Thank you for your time

Since you are urgent, you should also focus on the failed CIs.

Screenshot 2023-10-26 at 22 30 49

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can set up your IDE by following this doc: https://pulsar.apache.org/contribute/setup-ide/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc of Kafka also can be referred to.

image

Copy link
Contributor

@poorbarcode poorbarcode Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui @hzh0425 @liangyepianzhou

In the current PR, the implementation is different with Kafka:

  • Kafka: the consumer will receive all messages, even if the messages have been aborted.
  • Current PR:
    • the consumer will receive committed messages;
    • the consumer will receive the messages that state transaction on-gonging, once the transaction was aborted, these messages could not be received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have discussed this question in the meeting. At that time, the consensus was that not receiving aborted messages was a better solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original intention of using read uncommitted is to reduce read and write latency, not to read error messages

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are differences between Kafka and Pulsar in filtering the aborted message.

Kafka filters the aborted message on the client side, so if the client uses READ_UNCOMMITED, the client can't feel the ongoing transaction, the client is only able to filter the markers. If we need to filter the abort message on the kafka client, it requires to response of the abortedTransactions data each fetch, which will reduce the consumption performance.

Pulsar has different implements for filter parts, the filter logic is on the Broker side. If it is to reduce read and write latency, it is okay.

But I think reading all messages just like Kafka is beneficial in the following cases:
In the event of issues or during development and testing phases, having access to all messages, including uncommitted ones, can be valuable for debugging and troubleshooting.

@poorbarcode poorbarcode added this to the 3.2.0 milestone Oct 26, 2023
Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. Please address them. Thanks

}

if (properties.containsKey(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)) {
IsolationLevel isolationLevel = IsolationLevel.valueOf(Integer.parseInt(properties.get(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we catch the NumberFormatException here? And please add a test for this case.

protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();

public static void wrapIsolationLevelToProperties(Map<String, String> properties, IsolationLevel isolationLevel) {
if (properties != null) {
properties.put(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY, String.valueOf(isolationLevel.getValue()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the subscription was created, the prop SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY can not be modified, right?

If yes, we should prevent modifying it by the API pulsar-admin topics update-subscription-properties. And please add a test for this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, why do we need to persist the isolation level to properties?
It should follow the subscription type. The subscription will use the isolation type that the first consumer provided.
After all consumer disconnected, a new consumer will get a chance to change the isolation level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui

Sorry, why do we need to persist the isolation level to properties?
It should follow the subscription type. The subscription will use the isolation type that the first consumer provided.
After all consumer disconnected, a new consumer will get a chance to change the isolation level.

For example:

  • there are 10 messages
    • 3:0~3:4(committed)
    • 3:5~3:9(transaction on-going)]
  • create a subscription with reading un-committed isolation level
    • the consumer committed 3:0~3~6
  • close all consumers
  • create a new consumer on the old subscription with read committed isolation level, (Highlight) the read position and marked deleted position will go to a strange value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should follow the subscription type. The subscription will use the isolation type that the first consumer provided.
After all consumer disconnected, a new consumer will get a chance to change the isolation level.

I agree that the subscription use the isolation type of the first consumer provided. But the isolation level should not be changed. Just like schema.

@@ -160,6 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.pendingAckHandle = new PendingAckHandleDisabled();
}
IS_FENCED_UPDATER.set(this, FALSE);
this.isolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support users using a message that is read from a subscription typed READ_UNCOMMITTED for a new transaction?

  • If false, we should init the component pendingAckHandle typed PendingAckHandleDisabled when the isolationLevel is READ_UNCOMMITTED, right?
  • If yes, how can we skip the recover task for this pending ack?

And please add a test for this case.

@@ -236,6 +240,11 @@ public String getTypeString() {
return "Null";
}

@Override
public IsolationLevel getIsolationLevel() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use @lombok.Getter instead of this method is better?

@@ -228,6 +229,89 @@ public void sortedTest() throws Exception {
log.info("TransactionConsumeTest sortedTest finish.");
}

@Test
public void testConsumeMessageWithDifferentIsolationLevel() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add an E2E test ( do not start & commit transaction by PersistentTopic, just use a producer to do it). and another two test cases:

  • abort this transaction before receiving messages
  • unload topic before receiving message

}

// Now commit the transaction
persistentTopic.endTxn(txnID, TxnAction.COMMIT_VALUE, 0L).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add another test case: abort this transaction before receiving messages?

@@ -45,6 +45,7 @@ public class SubscriptionOption {
private Map<String, String> metadata;
private boolean readCompacted;
private CommandSubscribe.InitialPosition initialPosition;
private CommandSubscribe.IsolationLevel isolationLevel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you modify this variable with final?

import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.ToLongFunction;

public abstract class AbstractSubscription implements Subscription {
protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = "pulsar.subscription.isolation.level";
Copy link
Contributor

@poorbarcode poorbarcode Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is a prop for Transaction, we should make this prop name contain the keyword transaction, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for all the system properties, please add __ as a prefix.

protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();

public static void wrapIsolationLevelToProperties(Map<String, String> properties, IsolationLevel isolationLevel) {
if (properties != null) {
Copy link
Contributor

@poorbarcode poorbarcode Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the argument properties is null, you discarded the prop isolationLevel, it is wrong. please add a test for this case, thanks

@poorbarcode poorbarcode added type/feature The PR added a new feature or issue requested a new feature area/transaction labels Oct 26, 2023
@liangyepianzhou liangyepianzhou changed the title [improve][broker/consumer][PIP-298] Consumer supports specifying consumption isolation level [improve][txn][PIP-298] Consumer supports specifying consumption isolation level Oct 27, 2023
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.ToLongFunction;

public abstract class AbstractSubscription implements Subscription {
protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = "pulsar.subscription.isolation.level";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for all the system properties, please add __ as a prefix.

protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();

public static void wrapIsolationLevelToProperties(Map<String, String> properties, IsolationLevel isolationLevel) {
if (properties != null) {
properties.put(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY, String.valueOf(isolationLevel.getValue()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, why do we need to persist the isolation level to properties?
It should follow the subscription type. The subscription will use the isolation type that the first consumer provided.
After all consumer disconnected, a new consumer will get a chance to change the isolation level.

*
* @param subscriptionIsolationLevel If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed,
* else if READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted.
* Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be configured with the same IsolationLevel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc of Kafka also can be referred to.

image

@@ -399,6 +399,14 @@ message CommandSubscribe {

// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
optional uint64 consumer_epoch = 19;

enum IsolationLevel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The broker should only read the isolation type from a client version that has isolation-level support. Otherwise, the broker will get an incompatibility issue.

We should also add an integration test to https://github.com/apache/pulsar/tree/master/tests/integration
And we should also add a compatibility test like https://github.com/apache/pulsar/tree/master/tests/bc_2_6_0

*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public enum SubscriptionIsolationLevel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use TransactionIsolationLevel. We don't have any concept of SubscriptionIsolationLevel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/transaction doc-not-needed Your PR changes do not impact docs type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants